import atexit
import json
import os
import signal
import threading
import uuid
from datetime import timedelta
from typing import Dict, List, Optional

import pandas as pd
from couchbase_columnar.cluster import Cluster
from couchbase_columnar.credential import Credential
from couchbase_columnar.options import ClusterOptions, QueryOptions, TimeoutOptions

from feast.data_source import DataSource
from feast.feature_logging import LoggingDestination
from feast.infra.offline_stores.contrib.couchbase_offline_store.couchbase import (
    CouchbaseColumnarOfflineStoreConfig,
)
from feast.infra.offline_stores.contrib.couchbase_offline_store.couchbase_source import (
    CouchbaseColumnarLoggingDestination,
    CouchbaseColumnarSource,
)
from feast.infra.utils.couchbase.couchbase_utils import normalize_timestamp
from feast.repo_config import FeastConfigBaseModel
from tests.integration.feature_repos.universal.data_source_creator import (
    DataSourceCreator,
)

COUCHBASE_COLUMNAR_DATABASE = "Default"
COUCHBASE_COLUMNAR_SCOPE = "Default"


class CouchbaseColumnarDataSourceCreator(DataSourceCreator):
    _shutting_down = False
    _cluster = None
    _cluster_lock = threading.Lock()

    @classmethod
    def get_cluster(cls):
        with cls._cluster_lock:
            if cls._cluster is None:
                cred = Credential.from_username_and_password(
                    os.environ["COUCHBASE_COLUMNAR_USER"],
                    os.environ["COUCHBASE_COLUMNAR_PASSWORD"],
                )
                timeout_opts = TimeoutOptions(dispatch_timeout=timedelta(seconds=120))
                cls._cluster = Cluster.create_instance(
                    os.environ["COUCHBASE_COLUMNAR_CONNECTION_STRING"],
                    cred,
                    ClusterOptions(timeout_options=timeout_opts),
                )
            return cls._cluster

    def __init__(self, project_name: str, **kwargs):
        super().__init__(project_name)
        self.project_name = project_name
        self.collections: List[str] = []

        self.offline_store_config = CouchbaseColumnarOfflineStoreConfig(
            type="couchbase.offline",
            connection_string=os.environ["COUCHBASE_COLUMNAR_CONNECTION_STRING"],
            user=os.environ["COUCHBASE_COLUMNAR_USER"],
            password=os.environ["COUCHBASE_COLUMNAR_PASSWORD"],
            timeout=120,
        )

    def create_data_source(
        self,
        df: pd.DataFrame,
        destination_name: str,
        created_timestamp_column="created_ts",
        field_mapping: Optional[Dict[str, str]] = None,
        timestamp_field: Optional[str] = "ts",
    ) -> DataSource:
        def format_row(row):
            """Convert row to dictionary, handling NaN and timestamps"""
            return {
                col: (
                    normalize_timestamp(row[col])
                    if isinstance(row[col], pd.Timestamp)
                    else None
                    if pd.isna(row[col])
                    else row[col]
                )
                for col in row.index
            }

        collection_name = self.get_prefixed_collection_name(destination_name)

        create_cluster_query = f"CREATE ANALYTICS COLLECTION {COUCHBASE_COLUMNAR_DATABASE}.{COUCHBASE_COLUMNAR_SCOPE}.{collection_name} IF NOT EXISTS PRIMARY KEY(pk: UUID) AUTOGENERATED;"
        self.get_cluster().execute_query(
            create_cluster_query,
            QueryOptions(timeout=timedelta(seconds=self.offline_store_config.timeout)),
        )

        values_list = df.apply(format_row, axis=1).apply(json.dumps).tolist()
        values_clause = ",\n    ".join(values_list)

        insert_query = f"""
        INSERT INTO `{COUCHBASE_COLUMNAR_DATABASE}`.`{COUCHBASE_COLUMNAR_SCOPE}`.`{collection_name}` ([
            {values_clause}
        ])
        """
        self.get_cluster().execute_query(
            insert_query,
            QueryOptions(timeout=timedelta(seconds=self.offline_store_config.timeout)),
        )

        self.collections.append(collection_name)

        return CouchbaseColumnarSource(
            name=collection_name,
            query=f"SELECT VALUE v FROM {COUCHBASE_COLUMNAR_DATABASE}.{COUCHBASE_COLUMNAR_SCOPE}.`{collection_name}` v",
            database=COUCHBASE_COLUMNAR_DATABASE,
            scope=COUCHBASE_COLUMNAR_SCOPE,
            collection=collection_name,
            timestamp_field=timestamp_field,
            created_timestamp_column=created_timestamp_column,
            field_mapping=field_mapping or {"ts_1": "ts"},
        )

    def create_saved_dataset_destination(self):
        raise NotImplementedError

    def create_logged_features_destination(self) -> LoggingDestination:
        collection = self.get_prefixed_collection_name(
            f"logged_features_{str(uuid.uuid4()).replace('-', '_')}"
        )
        self.collections.append(collection)
        return CouchbaseColumnarLoggingDestination(
            table_name=collection,
            database=COUCHBASE_COLUMNAR_DATABASE,
            scope=COUCHBASE_COLUMNAR_SCOPE,
        )

    def create_offline_store_config(self) -> FeastConfigBaseModel:
        return self.offline_store_config

    def get_prefixed_collection_name(self, suffix: str) -> str:
        return f"{self.project_name}_{suffix}"

    @classmethod
    def get_dangling_collections(cls) -> List[str]:
        query = """
               SELECT VALUE d.DatabaseName || '.' || d.DataverseName || '.' || d.DatasetName
               FROM System.Metadata.`Dataset` d
               WHERE d.DataverseName <> "Metadata"
               AND (REGEXP_CONTAINS(d.DatasetName, "integration_test_.*")
                    OR REGEXP_CONTAINS(d.DatasetName, "feast_entity_df_.*"));
           """
        try:
            res = cls.get_cluster().execute_query(query)
            return res.get_all_rows()
        except Exception as e:
            print(f"Error fetching collections: {e}")
            return []

    @classmethod
    def cleanup_all(cls):
        if cls._shutting_down:
            return
        cls._shutting_down = True
        try:
            collections = cls.get_dangling_collections()
            if len(collections) == 0:
                print("No collections to clean up.")
                return

            print(f"Found {len(collections)} collections to clean up.")
            if len(collections) > 5:
                print("This may take a few minutes...")
            for collection in collections:
                try:
                    query = f"DROP COLLECTION {collection} IF EXISTS;"
                    cls.get_cluster().execute_query(query)
                    print(f"Dropped collection: {collection}")
                except Exception as e:
                    print(f"Error dropping collection {collection}: {e}")
        finally:
            print("Cleanup complete.")
            cls._shutting_down = False

    def teardown(self):
        for collection in self.collections:
            query = f"DROP COLLECTION {COUCHBASE_COLUMNAR_DATABASE}.{COUCHBASE_COLUMNAR_SCOPE}.`{collection}` IF EXISTS;"
            try:
                self.get_cluster().execute_query(
                    query,
                    QueryOptions(
                        timeout=timedelta(seconds=self.offline_store_config.timeout)
                    ),
                )
                print(f"Successfully dropped collection: {collection}")
            except Exception as e:
                print(f"Error dropping collection {collection}: {e}")


def cleanup_handler(signum, frame):
    print("\nCleaning up dangling resources...")
    try:
        CouchbaseColumnarDataSourceCreator.cleanup_all()
    except Exception as e:
        print(f"Error during cleanup: {e}")
    finally:
        # Re-raise the signal to properly exit
        signal.default_int_handler(signum, frame)


# Register both SIGINT and SIGTERM handlers
signal.signal(signal.SIGINT, cleanup_handler)
signal.signal(signal.SIGTERM, cleanup_handler)
atexit.register(CouchbaseColumnarDataSourceCreator.cleanup_all)
